/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.rocketmq.broker;

import com.google.common.collect.Lists;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.coldctr.ColdDataCgCtrService;
import org.apache.rocketmq.broker.coldctr.ColdDataPullRequestHoldService;
import org.apache.rocketmq.broker.controller.ReplicasManager;
import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.failover.EscapeBridge;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.broker.offset.BroadcastOffsetManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.RocksDBConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.RocksDBLmqConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.processor.AckMessageProcessor;
import org.apache.rocketmq.broker.processor.AdminBrokerProcessor;
import org.apache.rocketmq.broker.processor.ChangeInvisibleTimeProcessor;
import org.apache.rocketmq.broker.processor.ClientManageProcessor;
import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.NotificationProcessor;
import org.apache.rocketmq.broker.processor.PeekMessageProcessor;
import org.apache.rocketmq.broker.processor.PollingInfoProcessor;
import org.apache.rocketmq.broker.processor.PopInflightMessageCounter;
import org.apache.rocketmq.broker.processor.PopMessageProcessor;
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor;
import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.RocksDBLmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
import org.apache.rocketmq.broker.topic.RocksDBLmqTopicConfigManager;
import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.HookUtils;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.stats.MomentStatsItem;
import org.apache.rocketmq.common.utils.ServiceProvider;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.Configuration;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.BrokerSyncInfo;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo;
import org.apache.rocketmq.srvutil.FileWatchService;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.StoreType;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.hook.PutMessageHook;
import org.apache.rocketmq.store.hook.SendMessageBackHook;
import org.apache.rocketmq.store.plugin.MessageStoreFactory;
import org.apache.rocketmq.store.plugin.MessageStorePluginContext;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.stats.LmqBrokerStatsManager;
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.TimerMetrics;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

public class BrokerController {
    protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
    private static final Logger LOG_WATER_MARK = LoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME);
    protected static final int HA_ADDRESS_MIN_LENGTH = 6;

    /**
     * broker配置信息
     */
    protected final BrokerConfig brokerConfig;

    /**
     * netty服务端配置信息
     */
    private final NettyServerConfig nettyServerConfig;

    /**
     * netty客户端配置信息
     */
    private final NettyClientConfig nettyClientConfig;

    /**
     * 消息存储配置信息
     */
    protected final MessageStoreConfig messageStoreConfig;

    /**
     * 消费者偏移量管理器
     */
    protected final ConsumerOffsetManager consumerOffsetManager;

    /**
     * 主题消费偏移量管理器
     */
    protected final BroadcastOffsetManager broadcastOffsetManager;

    /**
     * 消费者管理器
     */
    protected final ConsumerManager consumerManager;

    /**
     * 消费者过滤器管理器
     */
    protected final ConsumerFilterManager consumerFilterManager;

    /**
     * 顺序消费信息管理器
     */
    protected final ConsumerOrderInfoManager consumerOrderInfoManager;

    /**
     *
     */
    protected final PopInflightMessageCounter popInflightMessageCounter;

    /**
     * 生产者管理器
     */
    protected final ProducerManager producerManager;

    /**
     * 消息调度服务
     */
    protected final ScheduleMessageService scheduleMessageService;

    /**
     * 客户端请求暂存服务
     */
    protected final ClientHousekeepingService clientHousekeepingService;

    /**
     * 拉取消息处理器
     */
    protected final PullMessageProcessor pullMessageProcessor;

    /**
     * 查询消息处理器
     */
    protected final PeekMessageProcessor peekMessageProcessor;

    protected final PopMessageProcessor popMessageProcessor;

    /**
     * 消息确认处理器
     */
    protected final AckMessageProcessor ackMessageProcessor;

    /**
     *
     */
    protected final ChangeInvisibleTimeProcessor changeInvisibleTimeProcessor;

    /**
     * 通知处理器
     */
    protected final NotificationProcessor notificationProcessor;

    /**
     * 拉取信息处理器
     */
    protected final PollingInfoProcessor pollingInfoProcessor;

    /**
     * 查询分配信息处理器
     */
    protected final QueryAssignmentProcessor queryAssignmentProcessor;

    /**
     * 客户端管理器处理器
     */
    protected final ClientManageProcessor clientManageProcessor;

    /**
     * 发送消息处理器
     */
    protected final SendMessageProcessor sendMessageProcessor;

    /**
     * 响应消息处理器
     */
    protected final ReplyMessageProcessor replyMessageProcessor;

    /**
     * 拉取消息请求暂存服务
     */
    protected final PullRequestHoldService pullRequestHoldService;

    /**
     * 收到消息监听器
     */
    protected final MessageArrivingListener messageArrivingListener;

    /**
     *
     */
    protected final Broker2Client broker2Client;

    /**
     * 消费者Ids变更监听器
     */
    protected final ConsumerIdsChangeListener consumerIdsChangeListener;

    /**
     * 事务结束处理器
     */
    protected final EndTransactionProcessor endTransactionProcessor;

    /**
     * 再平衡锁管理器
     */
    private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();

    /**
     * 主题路由信息管理器
     */
    private final TopicRouteInfoManager topicRouteInfoManager;
    protected BrokerOuterAPI brokerOuterAPI;

    /**
     * 调度线程池服务
     */
    protected ScheduledExecutorService scheduledExecutorService;
    protected ScheduledExecutorService syncBrokerMemberGroupExecutorService;
    protected ScheduledExecutorService brokerHeartbeatExecutorService;
    protected final SlaveSynchronize slaveSynchronize;

    /**
     * 发送消息线程池队列
     */
    protected final BlockingQueue<Runnable> sendThreadPoolQueue;

    /**
     *
     */
    protected final BlockingQueue<Runnable> putThreadPoolQueue;
    protected final BlockingQueue<Runnable> ackThreadPoolQueue;
    protected final BlockingQueue<Runnable> pullThreadPoolQueue;
    protected final BlockingQueue<Runnable> litePullThreadPoolQueue;
    protected final BlockingQueue<Runnable> replyThreadPoolQueue;

    /**
     * 查询请求处理线程池队列
     */
    protected final BlockingQueue<Runnable> queryThreadPoolQueue;

    /**
     *
     */
    protected final BlockingQueue<Runnable> clientManagerThreadPoolQueue;

    /**
     * 心跳线程池队列
     */
    protected final BlockingQueue<Runnable> heartbeatThreadPoolQueue;

    /**
     * 消费者管理器线程池队列
     */
    protected final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;

    /**
     * 事务结束处理线程池队列
     */
    protected final BlockingQueue<Runnable> endTransactionThreadPoolQueue;

    /**
     *
     */
    protected final BlockingQueue<Runnable> adminBrokerThreadPoolQueue;

    /**
     * 负载均衡线程池队列
     */
    protected final BlockingQueue<Runnable> loadBalanceThreadPoolQueue;

    /**
     * broker指标管理器
     */
    protected final BrokerStatsManager brokerStatsManager;

    /**
     * 发送消息回调列表，元素的类型是SendMessageHook
     */
    protected final List<SendMessageHook> sendMessageHookList = new ArrayList<>();

    /**
     * 消费消息回调列表，元素的类型是ConsumeMessageHook
     */
    protected final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();

    /**
     * 消息存储
     */
    protected MessageStore messageStore;

    /**
     * 远程服务器
     */
    protected RemotingServer remotingServer;

    /**
     * 远程服务启动的CountDownLatch，通过latch进行主线程阻塞，直到关闭。
     */
    protected CountDownLatch remotingServerStartLatch;
    protected RemotingServer fastRemotingServer;

    /**
     * 主题配置管理器
     */
    protected TopicConfigManager topicConfigManager;

    /**
     * 消费组订阅管理器
     */
    protected SubscriptionGroupManager subscriptionGroupManager;

    /**
     * 主题与队列映射管理器
     */
    protected TopicQueueMappingManager topicQueueMappingManager;

    /**
     * 发送消息的线程池
     */
    protected ExecutorService sendMessageExecutor;

    /**
     * 拉取消息的线程池
     */
    protected ExecutorService pullMessageExecutor;

    /**
     * 轻量级拉取消息线程池
     */
    protected ExecutorService litePullMessageExecutor;

    /**
     *
     */
    protected ExecutorService putMessageFutureExecutor;

    /**
     * 消息确认任务线程池
     */
    protected ExecutorService ackMessageExecutor;

    /**
     * 响应消息任务线程池
     */
    protected ExecutorService replyMessageExecutor;

    /**
     * 查询消息任务线程池
     */
    protected ExecutorService queryMessageExecutor;

    /**
     * broker管理任务线程池
     */
    protected ExecutorService adminBrokerExecutor;

    /**
     * 客户端管理任务线程池
     */
    protected ExecutorService clientManageExecutor;

    /**
     * 心跳任务线程池
     */
    protected ExecutorService heartbeatExecutor;

    /**
     * 消费者管理任务线程池
     */
    protected ExecutorService consumerManageExecutor;

    /**
     * 负载均衡任务线程池
     */
    protected ExecutorService loadBalanceExecutor;

    /**
     * 事务结束任务线程池
     */
    protected ExecutorService endTransactionExecutor;

    /**
     *
     */
    protected boolean updateMasterHAServerAddrPeriodically = false;

    /**
     * broker指标引用
     */
    private BrokerStats brokerStats;
    private InetSocketAddress storeHost;

    /**
     * 计时消息存储
     */
    private TimerMessageStore timerMessageStore;
    private TimerCheckpoint timerCheckpoint;
    protected BrokerFastFailure brokerFastFailure;
    private Configuration configuration;
    protected TopicQueueMappingCleanService topicQueueMappingCleanService;

    /**
     * 文件变更监控服务（主要用于信任证书、认证和密钥文件的监控）
     */
    protected FileWatchService fileWatchService;

    /**
     * 事务消息检查服务
     */
    protected TransactionalMessageCheckService transactionalMessageCheckService;

    /**
     * 事务消息服务
     */
    protected TransactionalMessageService transactionalMessageService;

    /**
     * 事务消息检查监听器
     */
    protected AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
    protected Map<Class, AccessValidator> accessValidatorMap = new HashMap<>();

    /**
     * Broker控制器关闭状态标志
     */
    protected volatile boolean shutdown = false;
    protected ShutdownHook shutdownHook;

    /**
     * 调度服务是否启动标志
     */
    private volatile boolean isScheduleServiceStart = false;

    /**
     * 事务检查服务是否启动标志
     */
    private volatile boolean isTransactionCheckServiceStart = false;

    /**
     * 用于记录同一个broker名称下不同broker的主从关系的BrokerMemberGroup实例
     */
    protected volatile BrokerMemberGroup brokerMemberGroup;
    protected EscapeBridge escapeBridge;
    protected List<BrokerAttachedPlugin> brokerAttachedPlugins = new ArrayList<>();
    protected volatile long shouldStartTime;
    private BrokerPreOnlineService brokerPreOnlineService;

    /**
     * 当前broker是否被隔离的标志
     */
    protected volatile boolean isIsolated = false;

    /**
     * broker主从组中最最小的brokerId，0表示broker组中的主节点，大于0的为从节点
     */
    protected volatile long minBrokerIdInGroup = 0;
    protected volatile String minBrokerAddrInGroup = null;

    /**
     * 互斥锁
     */
    private final Lock lock = new ReentrantLock();

    /**
     * 暂存调度的Future的List集合
     */
    protected final List<ScheduledFuture<?>> scheduledFutures = new ArrayList<>();

    /**
     * 消息副本管理器
     */
    protected ReplicasManager replicasManager;
    private long lastSyncTimeMs = System.currentTimeMillis();

    /**
     * broker指标管理器
     */
    private BrokerMetricsManager brokerMetricsManager;

    /**
     * 冷数据拉取请求暂存服务
     */
    private ColdDataPullRequestHoldService coldDataPullRequestHoldService;

    /**
     *
     */
    private ColdDataCgCtrService coldDataCgCtrService;

    public BrokerController(
            final BrokerConfig brokerConfig,
            final NettyServerConfig nettyServerConfig,
            final NettyClientConfig nettyClientConfig,
            final MessageStoreConfig messageStoreConfig,
            final ShutdownHook shutdownHook
    ) {
        this(brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig);
        this.shutdownHook = shutdownHook;
    }

    public BrokerController(
            final BrokerConfig brokerConfig,
            final MessageStoreConfig messageStoreConfig
    ) {
        this(brokerConfig, null, null, messageStoreConfig);
    }

    /**
     * 在Broker启动的时候，调用该构造器创建BrokerController实例
     *
     * @param brokerConfig       broker配置
     * @param nettyServerConfig  netty服务端配置
     * @param nettyClientConfig  netty客户端配置
     * @param messageStoreConfig 消息存储配置
     */
    public BrokerController(
            final BrokerConfig brokerConfig,
            final NettyServerConfig nettyServerConfig,
            final NettyClientConfig nettyClientConfig,
            final MessageStoreConfig messageStoreConfig
    ) {
        // 在控制器中保存配置信息
        this.brokerConfig = brokerConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.nettyClientConfig = nettyClientConfig;
        this.messageStoreConfig = messageStoreConfig;

        // 设置存储主机IP地址，端口号
        this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));

        // 创建并赋值broker指标管理器
        this.brokerStatsManager = messageStoreConfig.isEnableLmq()
                ?
                new LmqBrokerStatsManager(
                        this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()
                )
                :
                new BrokerStatsManager(
                        this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()
                );


        // 创建并赋值broker的偏移量广播管理器
        this.broadcastOffsetManager = new BroadcastOffsetManager(this);

        // 如果启用了RocksDB存储，则
        if (isEnableRocksDBStore()) {
            this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this);
            this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqSubscriptionGroupManager(this) : new RocksDBSubscriptionGroupManager(this);
            this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqConsumerOffsetManager(this) : new RocksDBConsumerOffsetManager(this);
        } else {
            // 否则
            // 赋值
            this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
            this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
            this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
        }

        // 创建并赋值主题队列映射管理器
        this.topicQueueMappingManager = new TopicQueueMappingManager(this);

        // 创建并赋值拉取消息处理器
        this.pullMessageProcessor = new PullMessageProcessor(this);

        // 创建并赋值
        this.peekMessageProcessor = new PeekMessageProcessor(this);

        // 创建并赋值拉取请求hold 服务
        this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this);

        // 创建弹出消息处理器
        this.popMessageProcessor = new PopMessageProcessor(this);

        // 通知处理器
        this.notificationProcessor = new NotificationProcessor(this);

        // 轮询信息请求处理器
        this.pollingInfoProcessor = new PollingInfoProcessor(this);

        // 消息确认请求处理器
        this.ackMessageProcessor = new AckMessageProcessor(this);

        // 更改可见性时间处理器
        this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this);

        // 发送消息处理器
        this.sendMessageProcessor = new SendMessageProcessor(this);

        // 响应消息处理器
        this.replyMessageProcessor = new ReplyMessageProcessor(this);

        // 收到消息事件监听器
        this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor);

        // 消费者Ids变更事件处理器
        this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);

        //消费者管理器
        this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager, this.brokerConfig);
        // 生产者管理器
        this.producerManager = new ProducerManager(this.brokerStatsManager);

        // 消费者过滤器管理器
        this.consumerFilterManager = new ConsumerFilterManager(this);

        // 有序消费信息管理器
        this.consumerOrderInfoManager = new ConsumerOrderInfoManager(this);

        //
        this.popInflightMessageCounter = new PopInflightMessageCounter(this);

        // 客户端请求暂存服务
        this.clientHousekeepingService = new ClientHousekeepingService(this);

        //
        this.broker2Client = new Broker2Client(this);

        // 消息调度服务
        this.scheduleMessageService = new ScheduleMessageService(this);

        // 冷数据拉取请求暂存服务
        this.coldDataPullRequestHoldService = new ColdDataPullRequestHoldService(this);

        // 冷数据
        this.coldDataCgCtrService = new ColdDataCgCtrService(this);

        if (nettyClientConfig != null) {
            this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
        }

        this.queryAssignmentProcessor = new QueryAssignmentProcessor(this);
        this.clientManageProcessor = new ClientManageProcessor(this);
        this.slaveSynchronize = new SlaveSynchronize(this);
        this.endTransactionProcessor = new EndTransactionProcessor(this);

        // 发送消息任务线程池队列
        this.sendThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getSendThreadPoolQueueCapacity());

        //
        this.putThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getPutThreadPoolQueueCapacity());

        // 拉取消息任务线程池队列
        this.pullThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getPullThreadPoolQueueCapacity());
        this.litePullThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getLitePullThreadPoolQueueCapacity());

        // 消息确认任务线程池队列
        this.ackThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getAckThreadPoolQueueCapacity());

        // 响应任务线程池队列
        this.replyThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getReplyThreadPoolQueueCapacity());

        // 查询任务线程池队列
        this.queryThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getQueryThreadPoolQueueCapacity());

        // 客户端管理器线程池队列
        this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
        // 消费者管理器线程池队列
        this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
        // 心跳线程池队列
        this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
        // 结束事务线程池队列
        this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
        // broker管理线程池队列
        this.adminBrokerThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getAdminBrokerThreadPoolQueueCapacity());
        // 本地负载均衡线程池队列
        this.loadBalanceThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getLoadBalanceThreadPoolQueueCapacity());

        // broker快速失败
        this.brokerFastFailure = new BrokerFastFailure(this);

        // broker配置文件所在路径
        String brokerConfigPath;
        if (brokerConfig.getBrokerConfigPath() != null && !brokerConfig.getBrokerConfigPath().isEmpty()) {
            // 如果配置了broker配置文件的路径，且该路径不是空的，则直接赋值
            brokerConfigPath = brokerConfig.getBrokerConfigPath();
        } else {
            // 获取默认的broker配置文件路径，并赋值
            brokerConfigPath = BrokerPathConfigHelper.getBrokerConfigPath();
        }

        // 使用Configuration对象存放所有的配置信息：
        // 1. broker配置文件路径；
        // 2. broker配置信息；
        // 3. netty服务端配置信息；
        // 4. netty客户端配置信息；
        // 5. 消息存储配置信息。
        this.configuration = new Configuration(
                LOG,
                brokerConfigPath,
                this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
        );

        this.brokerStatsManager.setProduerStateGetter(new BrokerStatsManager.StateGetter() {
            @Override
            public boolean online(String instanceId, String group, String topic) {
                if (getTopicConfigManager().getTopicConfigTable().containsKey(NamespaceUtil.wrapNamespace(instanceId, topic))) {
                    return getProducerManager().groupOnline(NamespaceUtil.wrapNamespace(instanceId, group));
                } else {
                    return getProducerManager().groupOnline(group);
                }
            }
        });
        this.brokerStatsManager.setConsumerStateGetter(new BrokerStatsManager.StateGetter() {
            @Override
            public boolean online(String instanceId, String group, String topic) {
                String topicFullName = NamespaceUtil.wrapNamespace(instanceId, topic);
                if (getTopicConfigManager().getTopicConfigTable().containsKey(topicFullName)) {
                    return getConsumerManager().findSubscriptionData(NamespaceUtil.wrapNamespace(instanceId, group), topicFullName) != null;
                } else {
                    return getConsumerManager().findSubscriptionData(group, topic) != null;
                }
            }
        });

        // 构建broker成员组对象，赋值：所在集群名称，当前broker名称
        this.brokerMemberGroup = new BrokerMemberGroup(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName());
        // 赋值broker地址
        this.brokerMemberGroup.getBrokerAddrs().put(this.brokerConfig.getBrokerId(), this.getBrokerAddr());

        this.escapeBridge = new EscapeBridge(this);

        // 创建主题路由信息管理器
        this.topicRouteInfoManager = new TopicRouteInfoManager(this);

        if (this.brokerConfig.isEnableSlaveActingMaster() && !this.brokerConfig.isSkipPreOnline()) {
            this.brokerPreOnlineService = new BrokerPreOnlineService(this);
        }
    }

    public BrokerConfig getBrokerConfig() {
        return brokerConfig;
    }

    public NettyServerConfig getNettyServerConfig() {
        return nettyServerConfig;
    }

    public NettyClientConfig getNettyClientConfig() {
        return nettyClientConfig;
    }

    public BlockingQueue<Runnable> getPullThreadPoolQueue() {
        return pullThreadPoolQueue;
    }

    public BlockingQueue<Runnable> getQueryThreadPoolQueue() {
        return queryThreadPoolQueue;
    }

    public BrokerMetricsManager getBrokerMetricsManager() {
        return brokerMetricsManager;
    }

    protected void initializeRemotingServer() throws CloneNotSupportedException {
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
        NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();

        int listeningPort = nettyServerConfig.getListenPort() - 2;
        if (listeningPort < 0) {
            listeningPort = 0;
        }
        fastConfig.setListenPort(listeningPort);

        this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
    }

    /**
     * Initialize resources including remoting server and thread executors.
     */
    protected void initializeResources() {
        this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
                new ThreadFactoryImpl("BrokerControllerScheduledThread", true, getBrokerIdentity()));

        this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
                this.brokerConfig.getSendMessageThreadPoolNums(),
                this.brokerConfig.getSendMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.sendThreadPoolQueue,
                new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));

        this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
                this.brokerConfig.getPullMessageThreadPoolNums(),
                this.brokerConfig.getPullMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.pullThreadPoolQueue,
                new ThreadFactoryImpl("PullMessageThread_", getBrokerIdentity()));

        this.litePullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
                this.brokerConfig.getLitePullMessageThreadPoolNums(),
                this.brokerConfig.getLitePullMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.litePullThreadPoolQueue,
                new ThreadFactoryImpl("LitePullMessageThread_", getBrokerIdentity()));

        this.putMessageFutureExecutor = ThreadUtils.newThreadPoolExecutor(
                this.brokerConfig.getPutMessageFutureThreadPoolNums(),
                this.brokerConfig.getPutMessageFutureThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.putThreadPoolQueue,
                new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));

        this.ackMessageExecutor = ThreadUtils.newThreadPoolExecutor(
                this.brokerConfig.getAckMessageThreadPoolNums(),
                this.brokerConfig.getAckMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.ackThreadPoolQueue,
                new ThreadFactoryImpl("AckMessageThread_", getBrokerIdentity()));

        this.queryMessageExecutor = ThreadUtils.newThreadPoolExecutor(
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.queryThreadPoolQueue,
                new ThreadFactoryImpl("QueryMessageThread_", getBrokerIdentity()));

        this.adminBrokerExecutor = ThreadUtils.newThreadPoolExecutor(
                this.brokerConfig.getAdminBrokerThreadPoolNums(),
                this.brokerConfig.getAdminBrokerThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.adminBrokerThreadPoolQueue,
                new ThreadFactoryImpl("AdminBrokerThread_", getBrokerIdentity()));

        this.clientManageExecutor = ThreadUtils.newThreadPoolExecutor(
                this.brokerConfig.getClientManageThreadPoolNums(),
                this.brokerConfig.getClientManageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.clientManagerThreadPoolQueue,
                new ThreadFactoryImpl("ClientManageThread_", getBrokerIdentity()));

        this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor(
                this.brokerConfig.getHeartbeatThreadPoolNums(),
                this.brokerConfig.getHeartbeatThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.heartbeatThreadPoolQueue,
                new ThreadFactoryImpl("HeartbeatThread_", true, getBrokerIdentity()));

        this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
                this.brokerConfig.getConsumerManageThreadPoolNums(),
                this.brokerConfig.getConsumerManageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.consumerManagerThreadPoolQueue,
                new ThreadFactoryImpl("ConsumerManageThread_", true, getBrokerIdentity()));

        this.replyMessageExecutor = ThreadUtils.newThreadPoolExecutor(
                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.replyThreadPoolQueue,
                new ThreadFactoryImpl("ProcessReplyMessageThread_", getBrokerIdentity()));

        this.endTransactionExecutor = ThreadUtils.newThreadPoolExecutor(
                this.brokerConfig.getEndTransactionThreadPoolNums(),
                this.brokerConfig.getEndTransactionThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.endTransactionThreadPoolQueue,
                new ThreadFactoryImpl("EndTransactionThread_", getBrokerIdentity()));

        this.loadBalanceExecutor = ThreadUtils.newThreadPoolExecutor(
                this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
                this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.loadBalanceThreadPoolQueue,
                new ThreadFactoryImpl("LoadBalanceProcessorThread_", getBrokerIdentity()));

        this.syncBrokerMemberGroupExecutorService = ThreadUtils.newScheduledThreadPool(1,
                new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", getBrokerIdentity()));
        this.brokerHeartbeatExecutorService = ThreadUtils.newScheduledThreadPool(1,
                new ThreadFactoryImpl("BrokerControllerHeartbeatScheduledThread", getBrokerIdentity()));

        this.topicQueueMappingCleanService = new TopicQueueMappingCleanService(this);
    }

    /**
     * 初始化broker的调度任务
     */
    protected void initializeBrokerScheduledTasks() {
        //
        final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
        //
        final long period = TimeUnit.DAYS.toMillis(1);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.getBrokerStats().record();
                } catch (Throwable e) {
                    LOG.error("BrokerController: failed to record broker stats", e);
                }
            }
        }, initialDelay, period, TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.consumerOffsetManager.persist();
                } catch (Throwable e) {
                    LOG.error(
                            "BrokerController: failed to persist config file of consumerOffset", e);
                }
            }
        }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.consumerFilterManager.persist();
                    BrokerController.this.consumerOrderInfoManager.persist();
                } catch (Throwable e) {
                    LOG.error(
                            "BrokerController: failed to persist config file of consumerFilter or consumerOrderInfo",
                            e);
                }
            }
        }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.protectBroker();
                } catch (Throwable e) {
                    LOG.error("BrokerController: failed to protectBroker", e);
                }
            }
        }, 3, 3, TimeUnit.MINUTES);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.printWaterMark();
                } catch (Throwable e) {
                    LOG.error("BrokerController: failed to print broker watermark", e);
                }
            }
        }, 10, 1, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    LOG.info("Dispatch task fall behind commit log {}bytes",
                            BrokerController.this.getMessageStore().dispatchBehindBytes());
                } catch (Throwable e) {
                    LOG.error("Failed to print dispatchBehindBytes", e);
                }
            }
        }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

        if (!messageStoreConfig.isEnableDLegerCommitLog() && !messageStoreConfig.isDuplicationEnable() && !brokerConfig.isEnableControllerMode()) {
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= HA_ADDRESS_MIN_LENGTH) {
                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                    this.updateMasterHAServerAddrPeriodically = false;
                } else {
                    this.updateMasterHAServerAddrPeriodically = true;
                }

                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            if (System.currentTimeMillis() - lastSyncTimeMs > 60 * 1000) {
                                BrokerController.this.getSlaveSynchronize().syncAll();
                                lastSyncTimeMs = System.currentTimeMillis();
                            }

                            //timer checkpoint, latency-sensitive, so sync it more frequently
                            if (messageStoreConfig.isTimerWheelEnable()) {
                                BrokerController.this.getSlaveSynchronize().syncTimerCheckPoint();
                            }
                        } catch (Throwable e) {
                            LOG.error("Failed to sync all config for slave.", e);
                        }
                    }
                }, 1000 * 10, 3 * 1000, TimeUnit.MILLISECONDS);

            } else {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printMasterAndSlaveDiff();
                        } catch (Throwable e) {
                            LOG.error("Failed to print diff of master and slave.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            }
        }

        if (this.brokerConfig.isEnableControllerMode()) {
            this.updateMasterHAServerAddrPeriodically = true;
        }
    }

    /**
     * 初始化调度任务
     */
    protected void initializeScheduledTasks() {

        initializeBrokerScheduledTasks();

        // 如果broker配置中包含命名服务器地址
        if (this.brokerConfig.getNamesrvAddr() != null) {
            // 更新命名服务器地址
            this.updateNamesrvAddr();
            LOG.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());

            // 延迟10秒开始执行，每两分钟执行一次，更新命名服务器地址
            // 可以通过DNS查找的方式进行，也可以大力出奇迹，向每个指定的nameserver地址发请求，成功就保留，失败就断开原来的连接
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.updateNamesrvAddr();
                    } catch (Throwable e) {
                        LOG.error("Failed to update nameServer address list", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
            // 从地址服务器拉取命名服务器地址列表
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                    } catch (Throwable e) {
                        LOG.error("Failed to fetch nameServer address", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFetchNamesrvAddrInterval(), TimeUnit.MILLISECONDS);
        }
    }

    /**
     * 更新命名服务器地址
     * 如果是DNS查找，则使用DNS查找可用的nameserver；
     * 否则向配置中指定的每个nameserver地址通信，成功就保留，失败就删除。
     */
    private void updateNamesrvAddr() {
        // 如果是通过DNS查找命名服务器地址，则通过DNS查找
        if (this.brokerConfig.isFetchNameSrvAddrByDnsLookup()) {
            this.brokerOuterAPI.updateNameServerAddressListByDnsLookup(this.brokerConfig.getNamesrvAddr());
        } else {
            // 与配置中的每个nameserver地址通信，成功就保留，失败就断开原来的连接
            this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
        }
    }

    /**
     * 从本地缓存文件加载数据：
     * <ol>
     * <li>加载主题配置信息</li>
     * <li>加载主题队列映射信息</li>
     * <li>加载消费者偏移量</li>
     * <li>加载订阅组信息</li>
     * <li>加载消费者过滤器数据</li>
     * <li>加载消费者OrderInfo信息</li>
     * </ol>
     * @return
     */
    public boolean initializeMetadata() {
        // 创建 BrokerController 的时候，就把这些对象创建出来了
        boolean result = this.topicConfigManager.load(); // 主题配置管理器
        result = result && this.topicQueueMappingManager.load(); // 主题队列映射管理器
        result = result && this.consumerOffsetManager.load();
        result = result && this.subscriptionGroupManager.load();
        result = result && this.consumerFilterManager.load();
        result = result && this.consumerOrderInfoManager.load();
        return result; // 全部加载成功，才返回 true
    }

    /**
     * 初始化消息存储
     *
     * @return
     */
    public boolean initializeMessageStore() {
        boolean result = true;
        try {
            // 使用DefaultMessageStore
            DefaultMessageStore defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());

            // 默认false
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                DLedgerRoleChangeHandler roleChangeHandler =
                        new DLedgerRoleChangeHandler(this, defaultMessageStore);
                ((DLedgerCommitLog) defaultMessageStore.getCommitLog())
                        .getdLedgerServer().getDLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
            }

            this.brokerStats = new BrokerStats(defaultMessageStore);

            // 加载存储插件
            MessageStorePluginContext context = new MessageStorePluginContext(
                    messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig, configuration);
            // 创建消息存储
            this.messageStore = MessageStoreFactory.build(context, defaultMessageStore);
            // 设置分发器
            this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
            // 如果启用了时间轮（用于实现任意时长的消息延迟，5.x新增的内容。5之前的版本的延迟消息有18个级别，分别延迟不同时间，延迟的时间不能设置，只能是按它固定的18个级别）
            if (messageStoreConfig.isTimerWheelEnable()) {
                // 创建计时器检查点，从指定位置加载持久化内容：${user.home}/store/config/timercheck
                this.timerCheckpoint = new TimerCheckpoint(BrokerPathConfigHelper.getTimerCheckPath(messageStoreConfig.getStorePathRootDir()));
                TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()));
                // 创建计时器消息存储
                this.timerMessageStore = new TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, timerMetrics, brokerStatsManager);
                // 注册计时器消息存储的钩子函数
                this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
                // 设置计时器消息存储
                this.messageStore.setTimerMessageStore(this.timerMessageStore);
            }
        } catch (IOException e) {
            result = false;
            LOG.error("BrokerController#initialize: unexpected error occurs", e);
        }
        return result;
    }

    /**
     * broker控制器的初始化
     *
     * @return
     * @throws CloneNotSupportedException
     */
    public boolean initialize() throws CloneNotSupportedException {

        // 初始化元数据
        boolean result = this.initializeMetadata();
        if (!result) {
            return false;
        }

        // 初始化消息存储
        result = this.initializeMessageStore();
        if (!result) {
            return false;
        }
        // 恢复和初始化各种服务
        return this.recoverAndInitService();
    }

    /**
     * 恢复和初始化各种服务
     *
     * @return
     * @throws CloneNotSupportedException
     */
    public boolean recoverAndInitService() throws CloneNotSupportedException {

        boolean result = true;

        // 默认false：是否启用broker的控制器模式，控制器模式支持broker角色的自动切换，5.x新增功能
        // 同一个 brokername 下面有多个 brokerid，每个 brokerid 对应一个节点，id 是 0 的是主节点，大于 0 的都是 slave。如果 master 宕机了，那么生产的消息不能往 broker 发，启用控制器模式就能自动切换 master 和 slave
        if (this.brokerConfig.isEnableControllerMode()) {
            // 如果启用了broker的控制器模式，则创建副本管理器
            this.replicasManager = new ReplicasManager(this);
            // 将副本管理器设置为fenced状态，即禁用副本
            this.replicasManager.setFenced(true);
        }

        // 加载消息存储
        if (messageStore != null) {
            registerMessageStoreHook();
            result = this.messageStore.load();
        }

        // 如果启用了时间轮以支持任意时长的延迟消息，在4.x 18个延迟级别的基础上5.x新增的功能
        if (messageStoreConfig.isTimerWheelEnable()) {
            // 加载时间轮消息存储
            result = result && this.timerMessageStore.load();
        }

        // 加载消息调度服务
        result = result && this.scheduleMessageService.load();

        // 遍历broker的插件
        for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
            if (brokerAttachedPlugin != null) {
                // 调用插件的load方法加载
                result = result && brokerAttachedPlugin.load();
            }
        }

        this.brokerMetricsManager = new BrokerMetricsManager(this);

        if (result) {
            // 初始化服务端
            initializeRemotingServer(); // broker 需要对外发布服务，通过 nettyServer
            // 初始化资源
            initializeResources();
            // 注册处理器
            registerProcessor(); // 底层通信依赖 netty，生产者消费者发送请求，不同请求对应不同请求码，调用对应 broker 的不同处理器
            // 初始化调度任务
            initializeScheduledTasks();
            // 初始化事务服务
            initialTransaction();
            // 初始化权限模块
            initialAcl();
            // 初始化RPC钩子
            initialRpcHooks();

            if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                // Register a listener to reload SslContext
                try {
                    fileWatchService = new FileWatchService(
                            new String[]{
                                    TlsSystemConfig.tlsServerCertPath,
                                    TlsSystemConfig.tlsServerKeyPath,
                                    TlsSystemConfig.tlsServerTrustCertPath
                            },
                            new FileWatchService.Listener() {
                                boolean certChanged, keyChanged = false;

                                @Override
                                public void onChanged(String path) { // 监控三个文件
                                    // 如果发生更改的文件是Broker信任证书路径，则重新加载Broker的SSL上下文
                                    if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                        LOG.info("The trust certificate changed, reload the ssl context");
                                        reloadServerSslContext();
                                    }
                                    // 如果发生更改的文件是Broker证书路径，则标记证书文件被修改过
                                    if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                        certChanged = true;
                                    }
                                    // 如果发生更改的文件是Broker的私钥路径，则标记私钥文件被修改过
                                    if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                        keyChanged = true;
                                    }
                                    // 如果证书和私钥都发生了更改，则重新加载Broker的SSL上下文
                                    if (certChanged && keyChanged) {
                                        LOG.info("The certificate and private key changed, reload the ssl context");
                                        certChanged = keyChanged = false;
                                        reloadServerSslContext();
                                    }
                                }

                                private void reloadServerSslContext() {
                                    ((NettyRemotingServer) remotingServer).loadSslContext();
                                    ((NettyRemotingServer) fastRemotingServer).loadSslContext();
                                }
                            });
                } catch (Exception e) {
                    result = false;
                    LOG.warn("FileWatchService created error, can't load the certificate dynamically");
                }
            }
        }

        return result;
    }

    /**
     * 注册消息存储回调
     */
    public void registerMessageStoreHook() {
        List<PutMessageHook> putMessageHookList = messageStore.getPutMessageHookList();

        putMessageHookList.add(new PutMessageHook() {
            @Override
            public String hookName() {
                return "checkBeforePutMessage";
            }

            @Override
            public PutMessageResult executeBeforePutMessage(MessageExt msg) {
                return HookUtils.checkBeforePutMessage(BrokerController.this, msg);
            }
        });

        putMessageHookList.add(new PutMessageHook() {
            @Override
            public String hookName() {
                return "innerBatchChecker";
            }

            @Override
            public PutMessageResult executeBeforePutMessage(MessageExt msg) {
                if (msg instanceof MessageExtBrokerInner) {
                    return HookUtils.checkInnerBatch(BrokerController.this, msg);
                }
                return null;
            }
        });

        // 对延迟消息进行处理
        putMessageHookList.add(new PutMessageHook() {
            @Override
            public String hookName() {
                return "handleScheduleMessage";
            }

            @Override
            public PutMessageResult executeBeforePutMessage(MessageExt msg) {
                if (msg instanceof MessageExtBrokerInner) {
                    return HookUtils.handleScheduleMessage(BrokerController.this, (MessageExtBrokerInner) msg);
                }
                return null;
            }
        });

        SendMessageBackHook sendMessageBackHook = new SendMessageBackHook() {
            @Override
            public boolean executeSendMessageBack(List<MessageExt> msgList, String brokerName, String brokerAddr) {
                return HookUtils.sendMessageBack(BrokerController.this, msgList, brokerName, brokerAddr);
            }
        };

        if (messageStore != null) {
            messageStore.setSendMessageBackHook(sendMessageBackHook);
        }
    }

    /**
     * 初始化事务
     */
    private void initialTransaction() {
        // 对事务消息服务进行初始化
        this.transactionalMessageService = ServiceProvider.loadClass(TransactionalMessageService.class);
        if (null == this.transactionalMessageService) {
            // 赋值
            this.transactionalMessageService = new TransactionalMessageServiceImpl(
                    new TransactionalMessageBridge(this, this.getMessageStore()));
            LOG.warn("Load default transaction message hook service: {}",
                    TransactionalMessageServiceImpl.class.getSimpleName());
        }

        // 对事务消息检查监听器进行初始化
        this.transactionalMessageCheckListener = ServiceProvider.loadClass(
                AbstractTransactionalMessageCheckListener.class);
        if (null == this.transactionalMessageCheckListener) {
            // 赋值
            this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
            LOG.warn("Load default discard message hook service: {}",
                    DefaultTransactionalMessageCheckListener.class.getSimpleName());
        }
        // 为事务消息检查监听器设置broker控制器
        this.transactionalMessageCheckListener.setBrokerController(this);
        // 实例化事务消息检查服务
        this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
    }

    private void initialAcl() {
        if (!this.brokerConfig.isAclEnable()) {
            LOG.info("The broker dose not enable acl");
            return;
        }

        List<AccessValidator> accessValidators = ServiceProvider.load(AccessValidator.class);
        if (accessValidators.isEmpty()) {
            LOG.info("ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator");
            accessValidators.add(new PlainAccessValidator());
        }

        for (AccessValidator accessValidator : accessValidators) {
            final AccessValidator validator = accessValidator;
            accessValidatorMap.put(validator.getClass(), validator);
            this.registerServerRPCHook(new RPCHook() {

                @Override
                public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
                    //Do not catch the exception
                    validator.validate(validator.parse(request, remoteAddr));
                }

                @Override
                public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
                }

            });
        }
    }

    private void initialRpcHooks() {

        List<RPCHook> rpcHooks = ServiceProvider.load(RPCHook.class);
        if (rpcHooks == null || rpcHooks.isEmpty()) {
            return;
        }
        for (RPCHook rpcHook : rpcHooks) {
            this.registerServerRPCHook(rpcHook);
        }
    }

    /**
     * 在broker控制器初始化的时候，要注册各种处理器
     */
    public void registerProcessor() {
        /*
         * SendMessageProcessor
         */
        sendMessageProcessor.registerSendMessageHook(sendMessageHookList);
        sendMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

        // 注册发送消息处理器，包括请求代码SEND_MESSAGE，发送消息处理器以及执行该任务的线程池。
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
        // 注册发送消息处理器，包括请求代码SEND_MESSAGE，发送消息处理器以及执行该任务的线程池。
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
        /**
         * PullMessageProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.LITE_PULL_MESSAGE, this.pullMessageProcessor, this.litePullMessageExecutor);
        this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
        /**
         * PeekMessageProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.PEEK_MESSAGE, this.peekMessageProcessor, this.pullMessageExecutor);
        /**
         * PopMessageProcessor
         * 请求码为POP_MESSAGE时，调用popMessageProcessor的处理方法。
         */
        this.remotingServer.registerProcessor(RequestCode.POP_MESSAGE, this.popMessageProcessor, this.pullMessageExecutor);

        /**
         * AckMessageProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);

        this.remotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);
        /**
         * ChangeInvisibleTimeProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
        /**
         * notificationProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.NOTIFICATION, this.notificationProcessor, this.pullMessageExecutor);

        /**
         * pollingInfoProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.POLLING_INFO, this.pollingInfoProcessor, this.pullMessageExecutor);

        /**
         * ReplyMessageProcessor
         */

        replyMessageProcessor.registerSendMessageHook(sendMessageHookList);

        this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);

        /**
         * QueryMessageProcessor
         */
        NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);

        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);

        /**
         * ClientManageProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
        this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);

        this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);

        /**
         * ConsumerManageProcessor
         */
        ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
        this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
        this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);

        this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);

        /**
         * QueryAssignmentProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor);
        this.remotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor);

        /*
         * 结束事务消息的处理器注册
         * 参数：
         *    请求码；
         *    处理器；
         *    线程池。
         */
        this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);

        /*
         * Default
         */
        AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
        this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
        this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
    }

    public BrokerStats getBrokerStats() {
        return brokerStats;
    }

    public void setBrokerStats(BrokerStats brokerStats) {
        this.brokerStats = brokerStats;
    }

    public void protectBroker() {
        if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {
            for (Map.Entry<String, MomentStatsItem> next : this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet()) {
                final long fallBehindBytes = next.getValue().getValue().get();
                if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {
                    final String[] split = next.getValue().getStatsKey().split("@");
                    final String group = split[2];
                    LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes);
                    this.subscriptionGroupManager.disableConsume(group);
                }
            }
        }
    }

    public long headSlowTimeMills(BlockingQueue<Runnable> q) {
        long slowTimeMills = 0;
        final Runnable peek = q.peek();
        if (peek != null) {
            RequestTask rt = BrokerFastFailure.castRunnable(peek);
            slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
        }

        if (slowTimeMills < 0) {
            slowTimeMills = 0;
        }

        return slowTimeMills;
    }

    public long headSlowTimeMills4SendThreadPoolQueue() {
        return this.headSlowTimeMills(this.sendThreadPoolQueue);
    }

    public long headSlowTimeMills4PullThreadPoolQueue() {
        return this.headSlowTimeMills(this.pullThreadPoolQueue);
    }

    public long headSlowTimeMills4LitePullThreadPoolQueue() {
        return this.headSlowTimeMills(this.litePullThreadPoolQueue);
    }

    public long headSlowTimeMills4QueryThreadPoolQueue() {
        return this.headSlowTimeMills(this.queryThreadPoolQueue);
    }

    public void printWaterMark() {
        LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
        LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
        LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());
        LOG_WATER_MARK.info("[WATERMARK] Lite Pull Queue Size: {} SlowTimeMills: {}", this.litePullThreadPoolQueue.size(), headSlowTimeMills4LitePullThreadPoolQueue());
        LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills(this.endTransactionThreadPoolQueue));
        LOG_WATER_MARK.info("[WATERMARK] ClientManager Queue Size: {} SlowTimeMills: {}", this.clientManagerThreadPoolQueue.size(), this.headSlowTimeMills(this.clientManagerThreadPoolQueue));
        LOG_WATER_MARK.info("[WATERMARK] Heartbeat Queue Size: {} SlowTimeMills: {}", this.heartbeatThreadPoolQueue.size(), this.headSlowTimeMills(this.heartbeatThreadPoolQueue));
        LOG_WATER_MARK.info("[WATERMARK] Ack Queue Size: {} SlowTimeMills: {}", this.ackThreadPoolQueue.size(), headSlowTimeMills(this.ackThreadPoolQueue));
        LOG_WATER_MARK.info("[WATERMARK] Admin Queue Size: {} SlowTimeMills: {}", this.adminBrokerThreadPoolQueue.size(), headSlowTimeMills(this.adminBrokerThreadPoolQueue));
    }

    public MessageStore getMessageStore() {
        return messageStore;
    }

    public void setMessageStore(MessageStore messageStore) {
        this.messageStore = messageStore;
    }

    protected void printMasterAndSlaveDiff() {
        if (messageStore.getHaService() != null && messageStore.getHaService().getConnectionCount().get() > 0) {
            long diff = this.messageStore.slaveFallBehindMuch();
            LOG.info("CommitLog: slave fall behind master {}bytes", diff);
        }
    }

    public Broker2Client getBroker2Client() {
        return broker2Client;
    }

    public ConsumerManager getConsumerManager() {
        return consumerManager;
    }

    public ConsumerFilterManager getConsumerFilterManager() {
        return consumerFilterManager;
    }

    public ConsumerOrderInfoManager getConsumerOrderInfoManager() {
        return consumerOrderInfoManager;
    }

    public PopInflightMessageCounter getPopInflightMessageCounter() {
        return popInflightMessageCounter;
    }

    public ConsumerOffsetManager getConsumerOffsetManager() {
        return consumerOffsetManager;
    }

    public BroadcastOffsetManager getBroadcastOffsetManager() {
        return broadcastOffsetManager;
    }

    public MessageStoreConfig getMessageStoreConfig() {
        return messageStoreConfig;
    }

    public ProducerManager getProducerManager() {
        return producerManager;
    }

    public void setFastRemotingServer(RemotingServer fastRemotingServer) {
        this.fastRemotingServer = fastRemotingServer;
    }

    public RemotingServer getFastRemotingServer() {
        return fastRemotingServer;
    }

    public PullMessageProcessor getPullMessageProcessor() {
        return pullMessageProcessor;
    }

    public PullRequestHoldService getPullRequestHoldService() {
        return pullRequestHoldService;
    }

    public void setSubscriptionGroupManager(SubscriptionGroupManager subscriptionGroupManager) {
        this.subscriptionGroupManager = subscriptionGroupManager;
    }

    public SubscriptionGroupManager getSubscriptionGroupManager() {
        return subscriptionGroupManager;
    }

    public PopMessageProcessor getPopMessageProcessor() {
        return popMessageProcessor;
    }

    public NotificationProcessor getNotificationProcessor() {
        return notificationProcessor;
    }

    public TimerMessageStore getTimerMessageStore() {
        return timerMessageStore;
    }

    public void setTimerMessageStore(TimerMessageStore timerMessageStore) {
        this.timerMessageStore = timerMessageStore;
    }

    public AckMessageProcessor getAckMessageProcessor() {
        return ackMessageProcessor;
    }

    public ChangeInvisibleTimeProcessor getChangeInvisibleTimeProcessor() {
        return changeInvisibleTimeProcessor;
    }

    protected void shutdownBasicService() {

        shutdown = true;

        this.unregisterBrokerAll();

        if (this.shutdownHook != null) {
            this.shutdownHook.beforeShutdown(this);
        }

        if (this.remotingServer != null) {
            this.remotingServer.shutdown();
        }

        if (this.fastRemotingServer != null) {
            this.fastRemotingServer.shutdown();
        }

        if (this.brokerMetricsManager != null) {
            this.brokerMetricsManager.shutdown();
        }

        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.shutdown();
        }

        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.shutdown();
        }

        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.shutdown();
        }

        {
            this.popMessageProcessor.getPopLongPollingService().shutdown();
            this.popMessageProcessor.getQueueLockManager().shutdown();
        }

        {
            this.popMessageProcessor.getPopBufferMergeService().shutdown();
            this.ackMessageProcessor.shutdownPopReviveService();
        }

        if (this.transactionalMessageService != null) {
            this.transactionalMessageService.close();
        }

        if (this.notificationProcessor != null) {
            this.notificationProcessor.getPopLongPollingService().shutdown();
        }

        if (this.consumerIdsChangeListener != null) {
            this.consumerIdsChangeListener.shutdown();
        }

        if (this.topicQueueMappingCleanService != null) {
            this.topicQueueMappingCleanService.shutdown();
        }
        //it is better to make sure the timerMessageStore shutdown firstly
        if (this.timerMessageStore != null) {
            this.timerMessageStore.shutdown();
        }
        if (this.fileWatchService != null) {
            this.fileWatchService.shutdown();
        }

        if (this.broadcastOffsetManager != null) {
            this.broadcastOffsetManager.shutdown();
        }

        if (this.messageStore != null) {
            this.messageStore.shutdown();
        }

        if (this.replicasManager != null) {
            this.replicasManager.shutdown();
        }

        shutdownScheduledExecutorService(this.scheduledExecutorService);

        if (this.sendMessageExecutor != null) {
            this.sendMessageExecutor.shutdown();
        }

        if (this.litePullMessageExecutor != null) {
            this.litePullMessageExecutor.shutdown();
        }

        if (this.pullMessageExecutor != null) {
            this.pullMessageExecutor.shutdown();
        }

        if (this.replyMessageExecutor != null) {
            this.replyMessageExecutor.shutdown();
        }

        if (this.putMessageFutureExecutor != null) {
            this.putMessageFutureExecutor.shutdown();
        }

        if (this.ackMessageExecutor != null) {
            this.ackMessageExecutor.shutdown();
        }

        if (this.adminBrokerExecutor != null) {
            this.adminBrokerExecutor.shutdown();
        }

        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.shutdown();
        }

        if (this.consumerFilterManager != null) {
            this.consumerFilterManager.persist();
        }

        if (this.consumerOrderInfoManager != null) {
            this.consumerOrderInfoManager.persist();
        }

        if (this.scheduleMessageService != null) {
            this.scheduleMessageService.persist();
            this.scheduleMessageService.shutdown();
        }

        if (this.clientManageExecutor != null) {
            this.clientManageExecutor.shutdown();
        }

        if (this.queryMessageExecutor != null) {
            this.queryMessageExecutor.shutdown();
        }

        if (this.heartbeatExecutor != null) {
            this.heartbeatExecutor.shutdown();
        }

        if (this.consumerManageExecutor != null) {
            this.consumerManageExecutor.shutdown();
        }

        if (this.transactionalMessageCheckService != null) {
            this.transactionalMessageCheckService.shutdown(false);
        }

        if (this.endTransactionExecutor != null) {
            this.endTransactionExecutor.shutdown();
        }

        if (this.escapeBridge != null) {
            escapeBridge.shutdown();
        }

        if (this.topicRouteInfoManager != null) {
            this.topicRouteInfoManager.shutdown();
        }

        if (this.brokerPreOnlineService != null && !this.brokerPreOnlineService.isStopped()) {
            this.brokerPreOnlineService.shutdown();
        }

        if (this.coldDataPullRequestHoldService != null) {
            this.coldDataPullRequestHoldService.shutdown();
        }

        if (this.coldDataCgCtrService != null) {
            this.coldDataCgCtrService.shutdown();
        }

        shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService);
        shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService);

        if (this.topicConfigManager != null) {
            this.topicConfigManager.persist();
            this.topicConfigManager.stop();
        }

        if (this.subscriptionGroupManager != null) {
            this.subscriptionGroupManager.persist();
            this.subscriptionGroupManager.stop();
        }

        if (this.consumerOffsetManager != null) {
            this.consumerOffsetManager.persist();
            this.consumerOffsetManager.stop();
        }

        for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
            if (brokerAttachedPlugin != null) {
                brokerAttachedPlugin.shutdown();
            }
        }
    }

    public void shutdown() {

        shutdownBasicService();

        for (ScheduledFuture<?> scheduledFuture : scheduledFutures) {
            scheduledFuture.cancel(true);
        }

        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.shutdown();
        }
    }

    protected void shutdownScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
        if (scheduledExecutorService == null) {
            return;
        }
        scheduledExecutorService.shutdown();
        try {
            scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ignore) {
            BrokerController.LOG.warn("shutdown ScheduledExecutorService was Interrupted!  ", ignore);
            Thread.currentThread().interrupt();
        }
    }

    protected void unregisterBrokerAll() {
        this.brokerOuterAPI.unregisterBrokerAll(
                this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId());
    }

    public String getBrokerAddr() {
        return this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort();
    }

    /**
     * 启动Broker控制器的基础服务
     *
     * @throws Exception
     */
    protected void startBasicService() throws Exception {

        // 启动消息存储服务
        if (this.messageStore != null) {
            this.messageStore.start();
        }

        // 基于时间轮的消息存储服务
        if (this.timerMessageStore != null) {
            this.timerMessageStore.start();
        }

        // 启动副本管理器
        if (this.replicasManager != null) {
            this.replicasManager.start();
        }

        if (remotingServerStartLatch != null) {
            remotingServerStartLatch.await();
        }

        // 启动远程服务器
        if (this.remotingServer != null) {
            this.remotingServer.start();

            // In test scenarios where it is up to OS to pick up an available port, set the listening port back to config
            if (null != nettyServerConfig && 0 == nettyServerConfig.getListenPort()) {
                nettyServerConfig.setListenPort(remotingServer.localListenPort());
            }
        }

        if (this.fastRemotingServer != null) {
            this.fastRemotingServer.start();
        }

        this.storeHost = new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort());

        for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
            if (brokerAttachedPlugin != null) {
                brokerAttachedPlugin.start();
            }
        }

        if (this.popMessageProcessor != null) {
            this.popMessageProcessor.getPopLongPollingService().start();
            this.popMessageProcessor.getPopBufferMergeService().start();
            this.popMessageProcessor.getQueueLockManager().start();
        }

        if (this.ackMessageProcessor != null) {
            this.ackMessageProcessor.startPopReviveService();
        }

        if (this.notificationProcessor != null) {
            this.notificationProcessor.getPopLongPollingService().start();
        }

        // 启动主题队列映射的清理服务
        if (this.topicQueueMappingCleanService != null) {
            this.topicQueueMappingCleanService.start();
        }

        // 启动文件监控服务（主要用于服务器认证和密钥以及受信任证书变更的监控）
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }

        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.start();
        }

        // 客户端
        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.start();
        }

        // 启动broker指标统计管理器
        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.start();
        }

        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.start();
        }

        if (this.broadcastOffsetManager != null) {
            this.broadcastOffsetManager.start();
        }

        if (this.escapeBridge != null) {
            this.escapeBridge.start();
        }

        // 启动主题路由信息管理器服务
        if (this.topicRouteInfoManager != null) {
            this.topicRouteInfoManager.start();
        }

        if (this.brokerPreOnlineService != null) {
            this.brokerPreOnlineService.start();
        }

        if (this.coldDataPullRequestHoldService != null) {
            this.coldDataPullRequestHoldService.start();
        }

        if (this.coldDataCgCtrService != null) {
            this.coldDataCgCtrService.start();
        }
    }

    /**
     * 启动broker控制器
     *
     * @throws Exception
     */
    public void start() throws Exception {
        // 记录开始的时间
        this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();

        if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster()) {
            isIsolated = true;
        }

        // 启动外向API
        if (this.brokerOuterAPI != null) { // broker 也会主动向外发送消息
            this.brokerOuterAPI.start();
        }

        // 启动基础服务
        startBasicService();

        // 默认false
        // 默认false
        // 默认false
        if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
            changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
            // 向nameserver注册
            this.registerBrokerAll(true, false, true);
        }

        // 添加调度任务
        // 延迟10秒开始，默认每隔30秒执行一次到nameserver的注册。
        scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
            @Override
            public void run0() {
                try {
                    if (System.currentTimeMillis() < shouldStartTime) {
                        BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
                        return;
                    }
                    if (isIsolated) {
                        BrokerController.LOG.info("Skip register for broker is isolated");
                        return;
                    }
                    // 注册到NameServer
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    BrokerController.LOG.error("registerBrokerAll Exception", e);
                }
            }
            // 1000 * 10：TimeUnit.MILLISECONDS 单位毫秒，延迟10秒开始执行
            // Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000))：默认，每隔30秒执行一次                  最大值不超过60000
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));

        if (this.brokerConfig.isEnableSlaveActingMaster()) {
            scheduleSendHeartbeat();

            scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
                @Override
                public void run0() {
                    try {
                        BrokerController.this.syncBrokerMemberGroup();
                    } catch (Throwable e) {
                        BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);
                    }
                }
            }, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
        }

        // 默认false
        if (this.brokerConfig.isEnableControllerMode()) {
            scheduleSendHeartbeat();
        }

        // 默认false
        if (brokerConfig.isSkipPreOnline()) {
            startServiceWithoutCondition();
        }

        // 调度线程池调度执行刷新元数据的任务：延迟10秒开始执行，每隔5秒钟执行一次
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.brokerOuterAPI.refreshMetadata();
                } catch (Exception e) {
                    LOG.error("ScheduledTask refresh metadata exception", e);
                }
            }
        }, 10, 5, TimeUnit.SECONDS);
    }

    /**
     * 调度向NameServer的心跳任务：
     * 延迟1秒钟执行，每隔1秒钟发送一次，用于broker的探活。
     */
    protected void scheduleSendHeartbeat() {
        scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
            @Override
            public void run0() {
                // 如果当前broker被隔离，则直接返回
                if (isIsolated) {
                    return;
                }
                try {
                    // 向NameServer发送心跳
                    BrokerController.this.sendHeartbeat();
                } catch (Exception e) {
                    BrokerController.LOG.error("sendHeartbeat Exception", e);
                }

            }
        }, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS));
    }

    /**
     * 全量注册主题配置信息
     *
     * @param topicConfig
     */
    public synchronized void registerSingleTopicAll(final TopicConfig topicConfig) {
        TopicConfig tmpTopic = topicConfig;
        // 处理读写
        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
            // Copy the topic config and modify the perm｜
            // 如果broker本身没有读权限或者没有写权限，则将主题配置的读写权限与broker读写权限进行位于操作
            tmpTopic = new TopicConfig(topicConfig);
            tmpTopic.setPerm(topicConfig.getPerm() & this.brokerConfig.getBrokerPermission());
        }
        // 发起请求进行注册
        this.brokerOuterAPI.registerSingleTopicAll(this.brokerConfig.getBrokerName(), tmpTopic, 3000);
    }

    /**
     * 增量注册
     *
     * @param topicConfig 主题配置信息
     * @param dataVersion 数据版本
     */
    public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
        // 增量注册数据
        this.registerIncrementBrokerData(Collections.singletonList(topicConfig), dataVersion);
    }

    /**
     * 增量注册
     *
     * @param topicConfigList 需要注册的主题配置信息集合
     * @param dataVersion     数据版本
     */
    public synchronized void registerIncrementBrokerData(List<TopicConfig> topicConfigList, DataVersion dataVersion) {
        if (topicConfigList == null || topicConfigList.isEmpty()) {
            return;
        }
        // 构造主题配置信息序列化包装器
        TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper = new TopicConfigAndMappingSerializeWrapper();
        // 设置数据版本
        topicConfigSerializeWrapper.setDataVersion(dataVersion);

        // 遍历主题配置列表
        ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigList.stream()
                .map(topicConfig -> {
                    TopicConfig registerTopicConfig;
                    // 如果broker没有读权限或没有写权限
                    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
                            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
                        // 将主题配置中的读写权限与broker读写权限执行位于操作
                        registerTopicConfig =
                                new TopicConfig(topicConfig.getTopicName(),
                                        topicConfig.getReadQueueNums(),
                                        topicConfig.getWriteQueueNums(),
                                        topicConfig.getPerm()
                                                & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());
                    } else {
                        // 否则直接使用原有配置
                        registerTopicConfig = new TopicConfig(topicConfig);
                    }
                    // 返回修设置好的主题配置信息
                    return registerTopicConfig;
                })
                // 主题名称作为key，主题配置信息作为value收集到ConcurrentMap中
                .collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity()));

        // 设置主题配置表
        topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);

        Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigList.stream()
                .map(TopicConfig::getTopicName)
                .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
                        .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info)))
                        .orElse(null))
                .filter(Objects::nonNull)
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        if (!topicQueueMappingInfoMap.isEmpty()) {
            // 如果存在主题队列映射信息，则设置主题队列映射信息
            topicConfigSerializeWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
        }
        // 执行注册
        doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
    }

    /**
     * 注册到NameServer
     *
     * @param checkOrderConfig
     * @param oneway
     * @param forceRegister
     */
    public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
        // 获取当前 broker 上的主题配置表
        // 主题配置表：key是主题名称，value是主题的配置对象
        ConcurrentMap<String, TopicConfig> topicConfigMap = this.getTopicConfigManager().getTopicConfigTable();
        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();

        // 遍历当前broker上的主题配置表
        for (TopicConfig topicConfig : topicConfigMap.values()) {
            // 如果不可写，或者不可读
            if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
                // 新建主题配置对象，设置主题名称，读队列个数，写队列个数，以及读写权限
                // 将主题名称作为key，主题配置对象作为value，添加到主题配置表中。
                topicConfigTable.put(topicConfig.getTopicName(),
                        new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                                topicConfig.getPerm() & getBrokerConfig().getBrokerPermission()));
            } else {
                // 将主题名称作为key，将主题配置信息作为value，放到新的主题配置表中
                topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }
            // 如果可以分批进行注册，并且主题配置表中的数据条目数达到了分批注册的阈值
            // topicConfigTable.size() >= this.brokerConfig.getSplitRegistrationSize()：主题配置表中的数据条目数达到了分批注册的阈值
            // 需要注意的是：注册分配和删除分批不能同时设置为true，会导致路由信息丢失。
            // enableSplitRegistration 默认 false，所以不会走 if。数据量太大的话，可以走分批
            if (this.brokerConfig.isEnableSplitRegistration() && topicConfigTable.size() >= this.brokerConfig.getSplitRegistrationSize()) {
                // 封装主题配置信息
                TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildSerializeWrapper(topicConfigTable);
                // 执行注册
                doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
                // 清空已收集的主题配置信息，由于是循环，会再次向主题配置表中添加数据，如果达到了分批注册阈值，再次向NameServer执行注册。
                topicConfigTable.clear();
            }
        }

        // this.getTopicQueueMappingManager().getTopicQueueMappingTable()：拿到主题映射表，主题和队列的映射表，一个主题下面有哪些队列
        // 对主题队列映射信息进行克隆，key是主题名称，value是主题队列映射信息
        Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream()
                .map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue())))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

        // 把主题配置表，主题队列映射信息表封装在一起，那么就可以向 nameserver 注册了
        TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().
                buildSerializeWrapper(topicConfigTable, topicQueueMappingInfoMap);

        // 默认：false
        // 默认：true
        //
        if (this.brokerConfig.isEnableSplitRegistration() || forceRegister || needRegister(
                this.brokerConfig.getBrokerClusterName(),          // broker所在集群名称
                this.getBrokerAddr(),                              // broker地址
                this.brokerConfig.getBrokerName(),                 // broker名称
                this.brokerConfig.getBrokerId(),                   // brokerId
                this.brokerConfig.getRegisterBrokerTimeoutMills(), // broker注册超时时间
                this.brokerConfig.isInBrokerContainer())           // broker是否在容器中：默认false
        ) {
            // 执行broker到NameServer的注册
            // true， false，
            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
        }
    }

    /**
     * @param checkOrderConfig
     * @param oneway
     * @param topicConfigWrapper
     */
    protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
                                       TopicConfigSerializeWrapper topicConfigWrapper) {
        // 如果broker控制器关闭了，则记录日志，方法返回。
        if (shutdown) {
            BrokerController.LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");
            return;
        }
        // 向 nameserver 执行注册
        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
                this.brokerConfig.getBrokerClusterName(),  // broker所在集群名称
                this.getBrokerAddr(),                      // broker节点地址
                this.brokerConfig.getBrokerName(),         // broker名称
                this.brokerConfig.getBrokerId(),           // brokerId，0表示master，大于0的值表示slave
                this.getHAServerAddr(),                    // 高可用服务节点地址
                topicConfigWrapper,                        // 包含了主题的配置信息以及主题到队列的映射信息
                Lists.newArrayList(),                      // 空的List集合
                oneway,                                    // 是否是单向请求，此处是false
                this.brokerConfig.getRegisterBrokerTimeoutMills(),    // 注册超时时间
                this.brokerConfig.isEnableSlaveActingMaster(),        // 是否允许slave切换到master角色
                this.brokerConfig.isCompressedRegister(),             // 是否是压缩的注册
                this.brokerConfig.isEnableSlaveActingMaster() ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null, // master不活跃多长时间后slave转换为master
                this.getBrokerIdentity()); // 对broker进行标记的对象

        // 处理注册的结果
        // 所以向 nameserver 的注册是双向的，是要拿到响应结果进行处理的
        handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);
    }

    /**
     * 发送心跳
     */
    protected void sendHeartbeat() {
        if (this.brokerConfig.isEnableControllerMode()) {
            this.replicasManager.sendHeartbeatToController();
        }

        if (this.brokerConfig.isEnableSlaveActingMaster()) {
            if (this.brokerConfig.isCompatibleWithOldNameSrv()) {
                this.brokerOuterAPI.sendHeartbeatViaDataVersion(
                        this.brokerConfig.getBrokerClusterName(),
                        this.getBrokerAddr(),
                        this.brokerConfig.getBrokerName(),
                        this.brokerConfig.getBrokerId(),
                        this.brokerConfig.getSendHeartbeatTimeoutMillis(),
                        this.getTopicConfigManager().getDataVersion(),
                        this.brokerConfig.isInBrokerContainer());
            } else {
                this.brokerOuterAPI.sendHeartbeat(
                        this.brokerConfig.getBrokerClusterName(),
                        this.getBrokerAddr(),
                        this.brokerConfig.getBrokerName(),
                        this.brokerConfig.getBrokerId(),
                        this.brokerConfig.getSendHeartbeatTimeoutMillis(),
                        this.brokerConfig.isInBrokerContainer());
            }
        }
    }

    protected void syncBrokerMemberGroup() {
        try {
            brokerMemberGroup = this.getBrokerOuterAPI()
                    .syncBrokerMemberGroup(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.brokerConfig.isCompatibleWithOldNameSrv());
        } catch (Exception e) {
            BrokerController.LOG.error("syncBrokerMemberGroup from namesrv failed, ", e);
            return;
        }
        if (brokerMemberGroup == null || brokerMemberGroup.getBrokerAddrs().size() == 0) {
            BrokerController.LOG.warn("Couldn't find any broker member from namesrv in {}/{}", this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName());
            return;
        }
        this.messageStore.setAliveReplicaNumInGroup(calcAliveBrokerNumInGroup(brokerMemberGroup.getBrokerAddrs()));

        if (!this.isIsolated) {
            long minBrokerId = brokerMemberGroup.minimumBrokerId();
            this.updateMinBroker(minBrokerId, brokerMemberGroup.getBrokerAddrs().get(minBrokerId));
        }
    }

    private int calcAliveBrokerNumInGroup(Map<Long, String> brokerAddrTable) {
        if (brokerAddrTable.containsKey(this.brokerConfig.getBrokerId())) {
            return brokerAddrTable.size();
        } else {
            return brokerAddrTable.size() + 1;
        }
    }

    /**
     * 处理broker向nameserver注册的结果
     *
     * @param registerBrokerResultList 结果集合
     * @param checkOrderConfig
     */
    protected void handleRegisterBrokerResult(List<RegisterBrokerResult> registerBrokerResultList,
                                              boolean checkOrderConfig) {
        for (RegisterBrokerResult registerBrokerResult : registerBrokerResultList) {
            if (registerBrokerResult != null) {
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                    this.messageStore.updateMasterAddress(registerBrokerResult.getMasterAddr());
                }

                // 如果当前broker是slave，则将注册结果中的master地址赋值给slaveSynchronize
                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

                if (checkOrderConfig) {
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
                break;
            }
        }
    }

    /**
     * 向所有NameServer发起同步调用，对比broker的数据版本与nameserver的数据版本，得出broker是否需要向nameserver注册的结果；
     * 只要这些NameServer中有一个数据版本与broker数据版本不同，则broker需要向nameserver注册。
     * @param clusterName           集群名称
     * @param brokerAddr            broker地址
     * @param brokerName            broker名称
     * @param brokerId              brokerId
     * @param timeoutMills          请求的超时时间
     * @param isInBrokerContainer   broker是否运行在容器中
     * @return 返回broker是否需要向nameserver注册
     */
    private boolean needRegister(final String clusterName,
                                 final String brokerAddr,
                                 final String brokerName,
                                 final long brokerId,
                                 final int timeoutMills,
                                 final boolean isInBrokerContainer) {

        // 创建主题配置序列化封装对象：其中封装了主题配置表和数据版本
        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
        // 通过远程调用，获取更改标志列表
        // broker所在集群名称
        // broker地址
        // broker名称
        // brokerId
        // 主题配置封装对象
        // 请求超时时间
        // broker是否在容器中运行
        // 返回值List中的元素对应于每个NameServer查询的结果
        List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills, isInBrokerContainer); // 向所有 nameserver 进行查询，其中的一个元素就是查询的结构
        boolean needRegister = false;
        // 只要有一个需要更新，则需要执行注册
        for (Boolean changed : changeList) {
            if (changed) {
                needRegister = true;
                break;
            }
        }
        return needRegister;
    }

    public void startService(long minBrokerId, String minBrokerAddr) {
        BrokerController.LOG.info("{} start service, min broker id is {}, min broker addr: {}",
                this.brokerConfig.getCanonicalName(), minBrokerId, minBrokerAddr);
        this.minBrokerIdInGroup = minBrokerId;
        this.minBrokerAddrInGroup = minBrokerAddr;

        this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == minBrokerId);
        this.registerBrokerAll(true, false, brokerConfig.isForceRegister());

        isIsolated = false;
    }

    public void startServiceWithoutCondition() {
        BrokerController.LOG.info("{} start service", this.brokerConfig.getCanonicalName());

        this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
        this.registerBrokerAll(true, false, brokerConfig.isForceRegister());

        isIsolated = false;
    }

    public void stopService() {
        BrokerController.LOG.info("{} stop service", this.getBrokerConfig().getCanonicalName());
        isIsolated = true;
        this.changeSpecialServiceStatus(false);
    }

    public boolean isSpecialServiceRunning() {
        if (isScheduleServiceStart() && isTransactionCheckServiceStart()) {
            return true;
        }

        return this.ackMessageProcessor != null && this.ackMessageProcessor.isPopReviveServiceRunning();
    }

    private void onMasterOffline() {
        // close channels with master broker
        String masterAddr = this.slaveSynchronize.getMasterAddr();
        if (masterAddr != null) {
            this.brokerOuterAPI.getRemotingClient().closeChannels(
                    Arrays.asList(masterAddr, MixAll.brokerVIPChannel(true, masterAddr)));
        }
        // master not available, stop sync
        this.slaveSynchronize.setMasterAddr(null);
        this.messageStore.updateHaMasterAddress(null);
    }

    private void onMasterOnline(String masterAddr, String masterHaAddr) {
        boolean needSyncMasterFlushOffset = this.messageStore.getMasterFlushedOffset() == 0
                && this.messageStoreConfig.isSyncMasterFlushOffsetWhenStartup();
        if (masterHaAddr == null || needSyncMasterFlushOffset) {
            try {
                BrokerSyncInfo brokerSyncInfo = this.brokerOuterAPI.retrieveBrokerHaInfo(masterAddr);

                if (needSyncMasterFlushOffset) {
                    LOG.info("Set master flush offset in slave to {}", brokerSyncInfo.getMasterFlushOffset());
                    this.messageStore.setMasterFlushedOffset(brokerSyncInfo.getMasterFlushOffset());
                }

                if (masterHaAddr == null) {
                    this.messageStore.updateHaMasterAddress(brokerSyncInfo.getMasterHaAddress());
                    this.messageStore.updateMasterAddress(brokerSyncInfo.getMasterAddress());
                }
            } catch (Exception e) {
                LOG.error("retrieve master ha info exception, {}", e);
            }
        }

        // set master HA address.
        if (masterHaAddr != null) {
            this.messageStore.updateHaMasterAddress(masterHaAddr);
        }

        // wakeup HAClient
        this.messageStore.wakeupHAClient();
    }

    /**
     * 处理最小brokerId发生变化的情况
     * @param minBrokerId        新的最小brokerId
     * @param minBrokerAddr      新的最小brokerId对应的broker地址
     * @param offlineBrokerAddr  离线的broker地址
     * @param masterHaAddr       master的ha节点地址
     */
    private void onMinBrokerChange(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr,
                                   String masterHaAddr) {
        LOG.info("Min broker changed, old: {}-{}, new {}-{}",
                this.minBrokerIdInGroup, this.minBrokerAddrInGroup, minBrokerId, minBrokerAddr);

        // 缓存最小brokerId
        this.minBrokerIdInGroup = minBrokerId;
        // 缓存最小brokerId对应的broker地址
        this.minBrokerAddrInGroup = minBrokerAddr;

        // 如果当前broker就是对应的最小brokerId，则应该启动一些服务
        this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == this.minBrokerIdInGroup);

        // 如果有下线的broker地址，且下线的是master
        if (offlineBrokerAddr != null && offlineBrokerAddr.equals(this.slaveSynchronize.getMasterAddr())) {
            // 处理master下线
            onMasterOffline();
        }

        // 如果最小brokerId是masterId，并且最小brokerId对应的地址不是null
        if (minBrokerId == MixAll.MASTER_ID && minBrokerAddr != null) {
            // 处理master的上线
            onMasterOnline(minBrokerAddr, masterHaAddr);
        }

        // 如果最小brokerId是masterId，则通知master上线，即开始处理暂存的PullRequest从master拉取消息
        if (this.minBrokerIdInGroup == MixAll.MASTER_ID) {
            this.pullRequestHoldService.notifyMasterOnline();
        }
    }

    public void updateMinBroker(long minBrokerId, String minBrokerAddr) {
        if (brokerConfig.isEnableSlaveActingMaster() && brokerConfig.getBrokerId() != MixAll.MASTER_ID) {
            if (lock.tryLock()) {
                try {
                    if (minBrokerId != this.minBrokerIdInGroup) {
                        String offlineBrokerAddr = null;
                        if (minBrokerId > this.minBrokerIdInGroup) {
                            offlineBrokerAddr = this.minBrokerAddrInGroup;
                        }
                        onMinBrokerChange(minBrokerId, minBrokerAddr, offlineBrokerAddr, null);
                    }
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    /**
     * 更新最小brokerId的相关信息
     * @param minBrokerId        新的最小brokerId
     * @param minBrokerAddr      新的最小brokerId对应的地址
     * @param offlineBrokerAddr  离线的broker地址
     * @param masterHaAddr       master的ha节点地址
     */
    public void updateMinBroker(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr,
                                String masterHaAddr) {
        // 如果启用了允许slave以master角色运行，并且当前broker的id不是master的id（-1）
        if (brokerConfig.isEnableSlaveActingMaster() && brokerConfig.getBrokerId() != MixAll.MASTER_ID) {
            try {
                // 加锁
                if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
                    try {
                        // 如果传进来的最小brokerId不是当前broker所在组中的最小brokerId
                        if (minBrokerId != this.minBrokerIdInGroup) {
                            // 通知最小broker变化：最小brokerId，最小brokerId对应的地址，离线broker地址，master的ha节点地址
                            onMinBrokerChange(minBrokerId, minBrokerAddr, offlineBrokerAddr, masterHaAddr);
                        }
                    } finally {
                        // 释放锁
                        lock.unlock();
                    }

                }
            } catch (InterruptedException e) {
                LOG.error("Update min broker error, {}", e);
            }
        }
    }

    /**
     * 更改服务状态
     * @param shouldStart
     */
    public void changeSpecialServiceStatus(boolean shouldStart) {

        // 先过一遍broker绑定的插件
        for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
            if (brokerAttachedPlugin != null) {
                brokerAttachedPlugin.statusChanged(shouldStart);
            }
        }
        // 更改调度服务状态
        changeScheduleServiceStatus(shouldStart);

        // 更改事务检查服务状态
        changeTransactionCheckServiceStatus(shouldStart);

        // 如果包含消息确认处理器，则设置处理消息弹出服务的状态
        if (this.ackMessageProcessor != null) {
            LOG.info("Set PopReviveService Status to {}", shouldStart);
            this.ackMessageProcessor.setPopReviveServiceStatus(shouldStart);
        }
    }

    /**
     * 启动事务消息检查服务
     *
     * @param shouldStart
     */
    private synchronized void changeTransactionCheckServiceStatus(boolean shouldStart) {
        if (isTransactionCheckServiceStart != shouldStart) {
            LOG.info("TransactionCheckService status changed to {}", shouldStart);
            if (shouldStart) {
                this.transactionalMessageCheckService.start();
            } else {
                this.transactionalMessageCheckService.shutdown(true);
            }
            isTransactionCheckServiceStart = shouldStart;
        }
    }

    public synchronized void changeScheduleServiceStatus(boolean shouldStart) {
        if (isScheduleServiceStart != shouldStart) {
            LOG.info("ScheduleServiceStatus changed to {}", shouldStart);
            if (shouldStart) {
                // 启动消息调度服务
                this.scheduleMessageService.start();
            } else {
                this.scheduleMessageService.stop();
            }
            isScheduleServiceStart = shouldStart;

            if (timerMessageStore != null) {
                timerMessageStore.setShouldRunningDequeue(shouldStart);
            }
        }
    }

    public MessageStore getMessageStoreByBrokerName(String brokerName) {
        if (this.brokerConfig.getBrokerName().equals(brokerName)) {
            return this.getMessageStore();
        }
        return null;
    }

    public BrokerIdentity getBrokerIdentity() {
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            return new BrokerIdentity(
                    brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(),
                    Integer.parseInt(messageStoreConfig.getdLegerSelfId().substring(1)), brokerConfig.isInBrokerContainer());
        } else {
            return new BrokerIdentity(
                    brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(),
                    brokerConfig.getBrokerId(), brokerConfig.isInBrokerContainer());
        }
    }

    public TopicConfigManager getTopicConfigManager() {
        return topicConfigManager;
    }

    public void setTopicConfigManager(TopicConfigManager topicConfigManager) {
        this.topicConfigManager = topicConfigManager;
    }

    public TopicQueueMappingManager getTopicQueueMappingManager() {
        return topicQueueMappingManager;
    }

    public String getHAServerAddr() {
        return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
    }

    public RebalanceLockManager getRebalanceLockManager() {
        return rebalanceLockManager;
    }

    public SlaveSynchronize getSlaveSynchronize() {
        return slaveSynchronize;
    }

    public ScheduledExecutorService getScheduledExecutorService() {
        return scheduledExecutorService;
    }

    public ExecutorService getPullMessageExecutor() {
        return pullMessageExecutor;
    }

    public ExecutorService getPutMessageFutureExecutor() {
        return putMessageFutureExecutor;
    }

    public void setPullMessageExecutor(ExecutorService pullMessageExecutor) {
        this.pullMessageExecutor = pullMessageExecutor;
    }

    public BlockingQueue<Runnable> getSendThreadPoolQueue() {
        return sendThreadPoolQueue;
    }

    public BlockingQueue<Runnable> getAckThreadPoolQueue() {
        return ackThreadPoolQueue;
    }

    public BrokerStatsManager getBrokerStatsManager() {
        return brokerStatsManager;
    }

    public List<SendMessageHook> getSendMessageHookList() {
        return sendMessageHookList;
    }

    public void registerSendMessageHook(final SendMessageHook hook) {
        this.sendMessageHookList.add(hook);
        LOG.info("register SendMessageHook Hook, {}", hook.hookName());
    }

    public List<ConsumeMessageHook> getConsumeMessageHookList() {
        return consumeMessageHookList;
    }

    public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
        this.consumeMessageHookList.add(hook);
        LOG.info("register ConsumeMessageHook Hook, {}", hook.hookName());
    }

    public void registerServerRPCHook(RPCHook rpcHook) {
        getRemotingServer().registerRPCHook(rpcHook);
        this.fastRemotingServer.registerRPCHook(rpcHook);
    }

    public RemotingServer getRemotingServer() {
        return remotingServer;
    }

    public void setRemotingServer(RemotingServer remotingServer) {
        this.remotingServer = remotingServer;
    }

    public CountDownLatch getRemotingServerStartLatch() {
        return remotingServerStartLatch;
    }

    public void setRemotingServerStartLatch(CountDownLatch remotingServerStartLatch) {
        this.remotingServerStartLatch = remotingServerStartLatch;
    }

    public void registerClientRPCHook(RPCHook rpcHook) {
        this.getBrokerOuterAPI().registerRPCHook(rpcHook);
    }

    public BrokerOuterAPI getBrokerOuterAPI() {
        return brokerOuterAPI;
    }

    public InetSocketAddress getStoreHost() {
        return storeHost;
    }

    public void setStoreHost(InetSocketAddress storeHost) {
        this.storeHost = storeHost;
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public BlockingQueue<Runnable> getHeartbeatThreadPoolQueue() {
        return heartbeatThreadPoolQueue;
    }

    public TransactionalMessageCheckService getTransactionalMessageCheckService() {
        return transactionalMessageCheckService;
    }

    public void setTransactionalMessageCheckService(
            TransactionalMessageCheckService transactionalMessageCheckService) {
        this.transactionalMessageCheckService = transactionalMessageCheckService;
    }

    public TransactionalMessageService getTransactionalMessageService() {
        return transactionalMessageService;
    }

    public void setTransactionalMessageService(TransactionalMessageService transactionalMessageService) {
        this.transactionalMessageService = transactionalMessageService;
    }

    public AbstractTransactionalMessageCheckListener getTransactionalMessageCheckListener() {
        return transactionalMessageCheckListener;
    }

    public void setTransactionalMessageCheckListener(
            AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
        this.transactionalMessageCheckListener = transactionalMessageCheckListener;
    }

    public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
        return endTransactionThreadPoolQueue;

    }

    public Map<Class, AccessValidator> getAccessValidatorMap() {
        return accessValidatorMap;
    }

    public ExecutorService getSendMessageExecutor() {
        return sendMessageExecutor;
    }

    public SendMessageProcessor getSendMessageProcessor() {
        return sendMessageProcessor;
    }

    public QueryAssignmentProcessor getQueryAssignmentProcessor() {
        return queryAssignmentProcessor;
    }

    public TopicQueueMappingCleanService getTopicQueueMappingCleanService() {
        return topicQueueMappingCleanService;
    }

    public ExecutorService getAdminBrokerExecutor() {
        return adminBrokerExecutor;
    }

    public BlockingQueue<Runnable> getLitePullThreadPoolQueue() {
        return litePullThreadPoolQueue;
    }

    public ShutdownHook getShutdownHook() {
        return shutdownHook;
    }

    public void setShutdownHook(ShutdownHook shutdownHook) {
        this.shutdownHook = shutdownHook;
    }

    public long getMinBrokerIdInGroup() {
        return this.brokerConfig.getBrokerId();
    }

    /**
     * 如果当前broker是master，则返回当前实例；否则返回null。
     *
     * @return
     */
    public BrokerController peekMasterBroker() {
        return brokerConfig.getBrokerId() == MixAll.MASTER_ID ? this : null;
    }

    public BrokerMemberGroup getBrokerMemberGroup() {
        return this.brokerMemberGroup;
    }

    public int getListenPort() {
        return this.nettyServerConfig.getListenPort();
    }

    public List<BrokerAttachedPlugin> getBrokerAttachedPlugins() {
        return brokerAttachedPlugins;
    }

    public EscapeBridge getEscapeBridge() {
        return escapeBridge;
    }

    public long getShouldStartTime() {
        return shouldStartTime;
    }

    public BrokerPreOnlineService getBrokerPreOnlineService() {
        return brokerPreOnlineService;
    }

    public EndTransactionProcessor getEndTransactionProcessor() {
        return endTransactionProcessor;
    }

    public boolean isScheduleServiceStart() {
        return isScheduleServiceStart;
    }

    public boolean isTransactionCheckServiceStart() {
        return isTransactionCheckServiceStart;
    }

    public ScheduleMessageService getScheduleMessageService() {
        return scheduleMessageService;
    }

    public ReplicasManager getReplicasManager() {
        return replicasManager;
    }

    public void setIsolated(boolean isolated) {
        isIsolated = isolated;
    }

    public boolean isIsolated() {
        return this.isIsolated;
    }

    public TimerCheckpoint getTimerCheckpoint() {
        return timerCheckpoint;
    }

    public TopicRouteInfoManager getTopicRouteInfoManager() {
        return this.topicRouteInfoManager;
    }

    public BlockingQueue<Runnable> getClientManagerThreadPoolQueue() {
        return clientManagerThreadPoolQueue;
    }

    public BlockingQueue<Runnable> getConsumerManagerThreadPoolQueue() {
        return consumerManagerThreadPoolQueue;
    }

    public BlockingQueue<Runnable> getAsyncPutThreadPoolQueue() {
        return putThreadPoolQueue;
    }

    public BlockingQueue<Runnable> getReplyThreadPoolQueue() {
        return replyThreadPoolQueue;
    }

    public BlockingQueue<Runnable> getAdminBrokerThreadPoolQueue() {
        return adminBrokerThreadPoolQueue;
    }

    public ColdDataPullRequestHoldService getColdDataPullRequestHoldService() {
        return coldDataPullRequestHoldService;
    }

    public void setColdDataPullRequestHoldService(
            ColdDataPullRequestHoldService coldDataPullRequestHoldService) {
        this.coldDataPullRequestHoldService = coldDataPullRequestHoldService;
    }

    public ColdDataCgCtrService getColdDataCgCtrService() {
        return coldDataCgCtrService;
    }

    public void setColdDataCgCtrService(ColdDataCgCtrService coldDataCgCtrService) {
        this.coldDataCgCtrService = coldDataCgCtrService;
    }

    public boolean isEnableRocksDBStore() {
        return StoreType.DEFAULT_ROCKSDB.getStoreType().equalsIgnoreCase(this.messageStoreConfig.getStoreType());
    }
}
